{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "9610be80",
   "metadata": {
    "id": "9610be80"
   },
   "source": [
    "# Usage & Cost Admin API Cookbook\n",
    "\n",
    "**A practical guide to programmatically accessing your Claude API usage and cost data**"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "69b800fc",
   "metadata": {
    "id": "69b800fc"
   },
   "source": [
    "### What You Can Do\n",
    "\n",
    "**Usage Tracking:**\n",
    "- Monitor token consumption (uncached input, output, cache creation/reads)\n",
    "- Track usage across models, workspaces, and API keys\n",
    "- Analyze cache efficiency and server tool usage\n",
    "\n",
    "**Cost Analysis:**\n",
    "- Retrieve detailed cost breakdowns by service type\n",
    "- Monitor spending trends across workspaces\n",
    "- Generate reports for finance and chargeback scenarios\n",
    "\n",
    "**Common Use Cases:**\n",
    "- **Usage Monitoring**: Track consumption patterns and optimize costs\n",
    "- **Cost Attribution**: Allocate expenses across teams/projects by workspace\n",
    "- **Cache Analysis**: Measure and improve cache efficiency\n",
    "- **Financial Reporting**: Generate executive summaries and budget reports\n",
    "\n",
    "### API Overview\n",
    "\n",
    "Two main endpoints:\n",
    "1. **Messages Usage API**: Token-level usage data with flexible grouping\n",
    "2. **Cost API**: Financial data in USD with service breakdowns\n",
    "\n",
    "### Prerequisites & Security\n",
    "\n",
    "- **Admin API Key**: Get from [Claude Console](https://console.anthropic.com/settings/admin-keys) (format: `sk-ant-admin...`)\n",
    "- **Security**: Store keys in environment variables, rotate regularly, never commit to version control"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "edd50a16",
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "edd50a16",
    "outputId": "68f38db3-48ee-429f-cd6e-2d2ae74e009a"
   },
   "outputs": [],
   "source": "import os\nfrom datetime import datetime, time, timedelta\nfrom typing import Any\n\nimport requests\n\n\nclass AnthropicAdminAPI:\n    \"\"\"Secure wrapper for Anthropic Admin API endpoints.\"\"\"\n\n    def __init__(self, api_key: str | None = None):\n        self.api_key = api_key or os.getenv(\"ANTHROPIC_ADMIN_API_KEY\")\n        if not self.api_key:\n            raise ValueError(\n                \"Admin API key required. Set ANTHROPIC_ADMIN_API_KEY environment variable.\"\n            )\n\n        if not self.api_key.startswith(\"sk-ant-admin\"):\n            raise ValueError(\"Invalid Admin API key format.\")\n\n        self.base_url = \"https://api.anthropic.com/v1/organizations\"\n        self.headers = {\n            \"anthropic-version\": \"2023-06-01\",\n            \"x-api-key\": self.api_key,\n            \"Content-Type\": \"application/json\",\n        }\n\n    def _make_request(self, endpoint: str, params: dict[str, Any]) -> dict[str, Any]:\n        \"\"\"Make authenticated request with basic error handling.\"\"\"\n        url = f\"{self.base_url}/{endpoint}\"\n\n        try:\n            response = requests.get(url, headers=self.headers, params=params, timeout=30)\n            response.raise_for_status()\n            return response.json()\n        except requests.exceptions.HTTPError as e:\n            if response.status_code == 401:\n                raise ValueError(\"Invalid API key or insufficient permissions\") from e\n            elif response.status_code == 429:\n                raise requests.exceptions.RequestException(\n                    \"Rate limit exceeded - try again later\"\n                ) from e\n            else:\n                raise requests.exceptions.RequestException(f\"API error: {e}\") from e\n\n\n# Test connection\ndef test_connection():\n    try:\n        client = AnthropicAdminAPI()\n\n        # Simple test query - snap to start of day to align with bucket boundaries\n        params = {\n            \"starting_at\": (\n                datetime.combine(datetime.utcnow(), time.min) - timedelta(days=1)\n            ).strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n            \"ending_at\": datetime.combine(datetime.utcnow(), time.min).strftime(\n                \"%Y-%m-%dT%H:%M:%SZ\"\n            ),\n            \"bucket_width\": \"1d\",\n            \"limit\": 1,\n        }\n\n        client._make_request(\"usage_report/messages\", params)\n        print(\"✅ Connection successful!\")\n        return client\n\n    except Exception as e:\n        print(f\"❌ Connection failed: {e}\")\n        return None\n\n\nclient = test_connection()"
  },
  {
   "cell_type": "markdown",
   "id": "781a0721",
   "metadata": {
    "id": "781a0721"
   },
   "source": [
    "## Basic Usage & Cost Tracking\n",
    "\n",
    "### Understanding Usage Data\n",
    "\n",
    "The Messages Usage API provides token consumption in **time buckets** - fixed intervals containing aggregated usage.\n",
    "\n",
    "**Key Metrics:**\n",
    "- **uncached_input_tokens**: New input tokens (prompts, system messages)\n",
    "- **output_tokens**: Claude's responses\n",
    "- **cache_creation**: Tokens cached for reuse\n",
    "- **cache_read_input_tokens**: Previously cached tokens reused\n",
    "\n",
    "### Basic Usage Query"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "f9b26143",
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "f9b26143",
    "outputId": "c60b91c0-084d-4629-eddd-cf0fb941e042"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "📊 Usage Summary:\n",
      "Uncached input tokens: 267,751\n",
      "Output tokens: 2,848,746\n",
      "Cache creation: 0\n",
      "Cache reads: 0\n",
      "Cache efficiency: 0.0%\n",
      "Web searches: 0\n"
     ]
    }
   ],
   "source": [
    "def get_daily_usage(client, days_back=7):\n",
    "    \"\"\"Get usage data for the last N days.\"\"\"\n",
    "    end_time = datetime.combine(datetime.utcnow(), time.min)\n",
    "    start_time = end_time - timedelta(days=days_back)\n",
    "\n",
    "    params = {\n",
    "        \"starting_at\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"ending_at\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"bucket_width\": \"1d\",\n",
    "        \"limit\": days_back,\n",
    "    }\n",
    "\n",
    "    return client._make_request(\"usage_report/messages\", params)\n",
    "\n",
    "\n",
    "def analyze_usage_data(response):\n",
    "    \"\"\"Process and display usage data.\"\"\"\n",
    "    if not response or not response.get(\"data\"):\n",
    "        print(\"No usage data found.\")\n",
    "        return\n",
    "\n",
    "    total_uncached_input = total_output = total_cache_creation = 0\n",
    "    total_cache_reads = total_web_searches = 0\n",
    "    daily_data = []\n",
    "\n",
    "    for bucket in response[\"data\"]:\n",
    "        date = bucket[\"starting_at\"][:10]\n",
    "\n",
    "        # Sum all results in bucket\n",
    "        bucket_uncached = bucket_output = bucket_cache_creation = 0\n",
    "        bucket_cache_reads = bucket_web_searches = 0\n",
    "\n",
    "        for result in bucket[\"results\"]:\n",
    "            bucket_uncached += result.get(\"uncached_input_tokens\", 0)\n",
    "            bucket_output += result.get(\"output_tokens\", 0)\n",
    "\n",
    "            cache_creation = result.get(\"cache_creation\", {})\n",
    "            bucket_cache_creation += cache_creation.get(\n",
    "                \"ephemeral_1h_input_tokens\", 0\n",
    "            ) + cache_creation.get(\"ephemeral_5m_input_tokens\", 0)\n",
    "            bucket_cache_reads += result.get(\"cache_read_input_tokens\", 0)\n",
    "\n",
    "            server_tools = result.get(\"server_tool_use\", {})\n",
    "            bucket_web_searches += server_tools.get(\"web_search_requests\", 0)\n",
    "\n",
    "        daily_data.append(\n",
    "            {\n",
    "                \"date\": date,\n",
    "                \"uncached_input_tokens\": bucket_uncached,\n",
    "                \"output_tokens\": bucket_output,\n",
    "                \"cache_creation\": bucket_cache_creation,\n",
    "                \"cache_reads\": bucket_cache_reads,\n",
    "                \"web_searches\": bucket_web_searches,\n",
    "                \"total_tokens\": bucket_uncached + bucket_output,\n",
    "            }\n",
    "        )\n",
    "\n",
    "        # Add to totals\n",
    "        total_uncached_input += bucket_uncached\n",
    "        total_output += bucket_output\n",
    "        total_cache_creation += bucket_cache_creation\n",
    "        total_cache_reads += bucket_cache_reads\n",
    "        total_web_searches += bucket_web_searches\n",
    "\n",
    "    # Calculate cache efficiency\n",
    "    total_input_tokens = total_uncached_input + total_cache_creation + total_cache_reads\n",
    "    cache_efficiency = (\n",
    "        (total_cache_reads / total_input_tokens * 100) if total_input_tokens > 0 else 0\n",
    "    )\n",
    "\n",
    "    # Display summary\n",
    "    print(\"📊 Usage Summary:\")\n",
    "    print(f\"Uncached input tokens: {total_uncached_input:,}\")\n",
    "    print(f\"Output tokens: {total_output:,}\")\n",
    "    print(f\"Cache creation: {total_cache_creation:,}\")\n",
    "    print(f\"Cache reads: {total_cache_reads:,}\")\n",
    "    print(f\"Cache efficiency: {cache_efficiency:.1f}%\")\n",
    "    print(f\"Web searches: {total_web_searches:,}\")\n",
    "\n",
    "    return daily_data\n",
    "\n",
    "\n",
    "# Example usage\n",
    "if client:\n",
    "    usage_response = get_daily_usage(client, days_back=7)\n",
    "    daily_usage = analyze_usage_data(usage_response)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e3164089",
   "metadata": {
    "id": "e3164089"
   },
   "source": [
    "## Basic Cost Tracking\n",
    "\n",
    "Note: Priority Tier costs use a different billing model and will never appear in the cost endpoint. You can track Priority Tier usage in the usage endpoint, but not costs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "SP4zefdtF0ft",
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "SP4zefdtF0ft",
    "outputId": "a1daa8d0-f36f-474b-9967-6fe00a0efe52"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "💰 Cost Summary:\n",
      "Total cost: $83.7574\n",
      "Average daily cost: $11.9653\n"
     ]
    }
   ],
   "source": [
    "def get_daily_costs(client, days_back=7):\n",
    "    \"\"\"Get cost data for the last N days.\"\"\"\n",
    "    end_time = datetime.combine(datetime.utcnow(), time.min)\n",
    "    start_time = end_time - timedelta(days=days_back)\n",
    "\n",
    "    params = {\n",
    "        \"starting_at\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"ending_at\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"bucket_width\": \"1d\",  # Only 1d supported for cost API\n",
    "        \"limit\": min(days_back, 31),  # Max 31 days per request\n",
    "    }\n",
    "\n",
    "    return client._make_request(\"cost_report\", params)\n",
    "\n",
    "\n",
    "def analyze_cost_data(response):\n",
    "    \"\"\"Process and display cost data.\"\"\"\n",
    "    if not response or not response.get(\"data\"):\n",
    "        print(\"No cost data found.\")\n",
    "        return\n",
    "\n",
    "    total_cost_minor_units = 0\n",
    "    daily_costs = []\n",
    "\n",
    "    for bucket in response[\"data\"]:\n",
    "        date = bucket[\"starting_at\"][:10]\n",
    "\n",
    "        # Sum all costs in this bucket\n",
    "        bucket_cost = 0\n",
    "        for result in bucket[\"results\"]:\n",
    "            # Convert string amounts to float if needed\n",
    "            amount = result.get(\"amount\", 0)\n",
    "            if isinstance(amount, str):\n",
    "                try:\n",
    "                    amount = float(amount)\n",
    "                except (ValueError, TypeError):\n",
    "                    amount = 0\n",
    "            bucket_cost += amount\n",
    "\n",
    "        daily_costs.append(\n",
    "            {\n",
    "                \"date\": date,\n",
    "                \"cost_minor_units\": bucket_cost,\n",
    "                \"cost_usd\": bucket_cost / 100,  # Convert to dollars\n",
    "            }\n",
    "        )\n",
    "\n",
    "        total_cost_minor_units += bucket_cost\n",
    "\n",
    "    total_cost_usd = total_cost_minor_units / 100\n",
    "\n",
    "    print(\"💰 Cost Summary:\")\n",
    "    print(f\"Total cost: ${total_cost_usd:.4f}\")\n",
    "    print(f\"Average daily cost: ${total_cost_usd / len(daily_costs):.4f}\")\n",
    "\n",
    "    return daily_costs\n",
    "\n",
    "\n",
    "# Example usage\n",
    "if client:\n",
    "    cost_response = get_daily_costs(client, days_back=7)\n",
    "    daily_costs = analyze_cost_data(cost_response)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6ce76fc4",
   "metadata": {
    "id": "6ce76fc4"
   },
   "source": [
    "## Grouping, Filtering & Pagination\n",
    "\n",
    "### Time Granularity Options\n",
    "\n",
    "**Usage API** supports three granularities:\n",
    "- `1m` (1 minute): High-resolution analysis, max 1440 buckets per request\n",
    "- `1h` (1 hour): Medium-resolution, max 168 buckets per request  \n",
    "- `1d` (1 day): Daily analysis, max 31 buckets per request\n",
    "\n",
    "**Cost API** supports:\n",
    "- `1d` (1 day): Only option available, max 31 buckets per request\n",
    "\n",
    "### Grouping and Filtering"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "87f1a7ac",
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "87f1a7ac",
    "outputId": "3266318c-8af4-4647-9360-b133ffa82452"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "📊 Usage by Model:\n",
      "  claude-3-5-haiku-20241022: 995,781 tokens\n",
      "  claude-sonnet-4-5: 861,880 tokens\n",
      "  claude-opus-4-1: 394,646 tokens\n",
      "  claude-sonnet-4-5: 356,766 tokens\n",
      "  claude-opus-4-20250514: 308,223 tokens\n",
      "  claude-opus-4-1: 199,201 tokens\n",
      "Found 7 days of filtered usage data\n"
     ]
    }
   ],
   "source": [
    "def get_usage_by_model(client, days_back=7):\n",
    "    \"\"\"Get usage data grouped by model, handling pagination automatically.\"\"\"\n",
    "    end_time = datetime.combine(datetime.utcnow(), time.min)\n",
    "    start_time = end_time - timedelta(days=days_back)\n",
    "\n",
    "    params = {\n",
    "        \"starting_at\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"ending_at\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"group_by[]\": [\"model\"],\n",
    "        \"bucket_width\": \"1d\",\n",
    "    }\n",
    "\n",
    "    # Aggregate across all pages of data\n",
    "    model_usage = {}\n",
    "    page_count = 0\n",
    "    max_pages = 10  # Reasonable limit to avoid infinite loops\n",
    "\n",
    "    try:\n",
    "        next_page = None\n",
    "\n",
    "        while page_count < max_pages:\n",
    "            current_params = params.copy()\n",
    "            if next_page:\n",
    "                current_params[\"page\"] = next_page\n",
    "\n",
    "            response = client._make_request(\"usage_report/messages\", current_params)\n",
    "            page_count += 1\n",
    "\n",
    "            # Process this page's data\n",
    "            for bucket in response.get(\"data\", []):\n",
    "                for result in bucket.get(\"results\", []):\n",
    "                    model = result.get(\"model\", \"Unknown\")\n",
    "                    uncached = result.get(\"uncached_input_tokens\", 0)\n",
    "                    output = result.get(\"output_tokens\", 0)\n",
    "                    cache_creation = result.get(\"cache_creation\", {})\n",
    "                    cache_creation_tokens = cache_creation.get(\n",
    "                        \"ephemeral_1h_input_tokens\", 0\n",
    "                    ) + cache_creation.get(\"ephemeral_5m_input_tokens\", 0)\n",
    "                    cache_reads = result.get(\"cache_read_input_tokens\", 0)\n",
    "                    tokens = uncached + output + cache_creation_tokens + cache_reads\n",
    "\n",
    "                    if model not in model_usage:\n",
    "                        model_usage[model] = 0\n",
    "                    model_usage[model] += tokens\n",
    "\n",
    "            # Check if there's more data\n",
    "            if not response.get(\"has_more\", False):\n",
    "                break\n",
    "\n",
    "            next_page = response.get(\"next_page\")\n",
    "            if not next_page:\n",
    "                break\n",
    "\n",
    "    except Exception as e:\n",
    "        print(f\"❌ Error retrieving usage data: {e}\")\n",
    "        return {}\n",
    "\n",
    "    # Display results\n",
    "    print(\"📊 Usage by Model:\")\n",
    "    if not model_usage:\n",
    "        print(f\"  No usage data found in the last {days_back} days\")\n",
    "        print(\"  💡 Try increasing the time range or check if you have recent API usage\")\n",
    "    else:\n",
    "        for model, tokens in sorted(model_usage.items(), key=lambda x: x[1], reverse=True):\n",
    "            print(f\"  {model}: {tokens:,} tokens\")\n",
    "\n",
    "    return model_usage\n",
    "\n",
    "\n",
    "def filter_usage_example(client):\n",
    "    \"\"\"Example of filtering usage data.\"\"\"\n",
    "    params = {\n",
    "        \"starting_at\": (datetime.combine(datetime.utcnow(), time.min) - timedelta(days=7)).strftime(\n",
    "            \"%Y-%m-%dT%H:%M:%SZ\"\n",
    "        ),\n",
    "        \"ending_at\": datetime.combine(datetime.utcnow(), time.min).strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"models[]\": [\"claude-sonnet-4-5\"],  # Filter to specific model\n",
    "        \"service_tiers[]\": [\"standard\"],  # Filter to standard tier\n",
    "        \"bucket_width\": \"1d\",\n",
    "    }\n",
    "\n",
    "    response = client._make_request(\"usage_report/messages\", params)\n",
    "    print(f\"Found {len(response.get('data', []))} days of filtered usage data\")\n",
    "    return response\n",
    "\n",
    "\n",
    "# Example usage\n",
    "if client:\n",
    "    model_usage = get_usage_by_model(client, days_back=14)\n",
    "    filtered_usage = filter_usage_example(client)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "487bc11d",
   "metadata": {
    "id": "487bc11d"
   },
   "source": [
    "### Pagination for Large Datasets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "d7cc5437",
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "d7cc5437",
    "outputId": "e7875fb4-09ae-44d2-f5fc-12a4cbdcce5a"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "📥 Fetching paginated data...\n",
      "  Page 1: 24 time buckets\n",
      "  Page 2: 24 time buckets\n",
      "  Page 3: 24 time buckets\n",
      "✅ Complete: Retrieved all data in 3 pages\n",
      "📊 Total retrieved: 72 time buckets\n",
      "📈 Total tokens across all data: 1,336,287\n"
     ]
    }
   ],
   "source": [
    "def fetch_all_usage_data(client, params, max_pages=10):\n",
    "    \"\"\"Fetch all paginated usage data.\"\"\"\n",
    "    all_data = []\n",
    "    page_count = 0\n",
    "    next_page = None\n",
    "\n",
    "    print(\"📥 Fetching paginated data...\")\n",
    "\n",
    "    while page_count < max_pages:\n",
    "        current_params = params.copy()\n",
    "        if next_page:\n",
    "            current_params[\"page\"] = next_page\n",
    "\n",
    "        try:\n",
    "            response = client._make_request(\"usage_report/messages\", current_params)\n",
    "\n",
    "            if not response or not response.get(\"data\"):\n",
    "                break\n",
    "\n",
    "            page_data = response[\"data\"]\n",
    "            all_data.extend(page_data)\n",
    "            page_count += 1\n",
    "\n",
    "            print(f\"  Page {page_count}: {len(page_data)} time buckets\")\n",
    "\n",
    "            if not response.get(\"has_more\", False):\n",
    "                print(f\"✅ Complete: Retrieved all data in {page_count} pages\")\n",
    "                break\n",
    "\n",
    "            next_page = response.get(\"next_page\")\n",
    "            if not next_page:\n",
    "                break\n",
    "\n",
    "        except Exception as e:\n",
    "            print(f\"❌ Error on page {page_count + 1}: {e}\")\n",
    "            break\n",
    "\n",
    "    print(f\"📊 Total retrieved: {len(all_data)} time buckets\")\n",
    "    return all_data\n",
    "\n",
    "\n",
    "def large_dataset_example(client, days_back=3):\n",
    "    \"\"\"Example of handling a large dataset with pagination.\"\"\"\n",
    "    # Use recent time range to ensure we have data\n",
    "    start_time = datetime.combine(datetime.utcnow(), time.min) - timedelta(days=days_back)\n",
    "    end_time = datetime.combine(datetime.utcnow(), time.min)\n",
    "\n",
    "    params = {\n",
    "        \"starting_at\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"ending_at\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"bucket_width\": \"1h\",  # Hourly data for more buckets\n",
    "        \"group_by[]\": [\"model\"],\n",
    "        \"limit\": 24,  # One day per page\n",
    "    }\n",
    "\n",
    "    all_buckets = fetch_all_usage_data(client, params, max_pages=5)\n",
    "\n",
    "    # Process the large dataset\n",
    "    if all_buckets:\n",
    "        total_tokens = sum(\n",
    "            sum(\n",
    "                result.get(\"uncached_input_tokens\", 0) + result.get(\"output_tokens\", 0)\n",
    "                for result in bucket[\"results\"]\n",
    "            )\n",
    "            for bucket in all_buckets\n",
    "        )\n",
    "        print(f\"📈 Total tokens across all data: {total_tokens:,}\")\n",
    "\n",
    "    return all_buckets\n",
    "\n",
    "\n",
    "# Example usage - use shorter time range to find recent data\n",
    "if client:\n",
    "    large_dataset = large_dataset_example(client, days_back=3)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d55bbb55",
   "metadata": {
    "id": "d55bbb55"
   },
   "source": [
    "## Simple Data Export\n",
    "\n",
    "### CSV Export for External Analysis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "f365296a",
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "f365296a",
    "outputId": "87b4a733-6f90-4b8d-a109-7a23281b2a4d"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "✅ Exported 36 rows to my_usage_data.csv\n",
      "✅ Exported 72 cost records to my_cost_data.csv\n"
     ]
    }
   ],
   "source": [
    "import csv\n",
    "\n",
    "\n",
    "def export_usage_to_csv(client, output_file=\"usage_data.csv\", days_back=30):\n",
    "    \"\"\"Export usage data to CSV for external analysis.\"\"\"\n",
    "\n",
    "    end_time = datetime.combine(datetime.utcnow(), time.min)\n",
    "    start_time = end_time - timedelta(days=days_back)\n",
    "\n",
    "    params = {\n",
    "        \"starting_at\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"ending_at\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"group_by[]\": [\"model\", \"service_tier\", \"workspace_id\"],\n",
    "        \"bucket_width\": \"1d\",\n",
    "    }\n",
    "\n",
    "    try:\n",
    "        # Collect all data across pages\n",
    "        rows = []\n",
    "        page_count = 0\n",
    "        max_pages = 20  # Allow more pages for export\n",
    "        next_page = None\n",
    "\n",
    "        while page_count < max_pages:\n",
    "            current_params = params.copy()\n",
    "            if next_page:\n",
    "                current_params[\"page\"] = next_page\n",
    "\n",
    "            response = client._make_request(\"usage_report/messages\", current_params)\n",
    "            page_count += 1\n",
    "\n",
    "            # Process this page's data\n",
    "            for bucket in response.get(\"data\", []):\n",
    "                date = bucket[\"starting_at\"][:10]\n",
    "                for result in bucket[\"results\"]:\n",
    "                    rows.append(\n",
    "                        {\n",
    "                            \"date\": date,\n",
    "                            \"model\": result.get(\"model\", \"\"),\n",
    "                            \"service_tier\": result.get(\"service_tier\", \"\"),\n",
    "                            \"workspace_id\": result.get(\"workspace_id\", \"\"),\n",
    "                            \"uncached_input_tokens\": result.get(\"uncached_input_tokens\", 0),\n",
    "                            \"output_tokens\": result.get(\"output_tokens\", 0),\n",
    "                            \"cache_creation_tokens\": (\n",
    "                                result.get(\"cache_creation\", {}).get(\"ephemeral_1h_input_tokens\", 0)\n",
    "                                + result.get(\"cache_creation\", {}).get(\n",
    "                                    \"ephemeral_5m_input_tokens\", 0\n",
    "                                )\n",
    "                            ),\n",
    "                            \"cache_read_tokens\": result.get(\"cache_read_input_tokens\", 0),\n",
    "                            \"web_search_requests\": result.get(\"server_tool_use\", {}).get(\n",
    "                                \"web_search_requests\", 0\n",
    "                            ),\n",
    "                        }\n",
    "                    )\n",
    "\n",
    "            # Check if there's more data\n",
    "            if not response.get(\"has_more\", False):\n",
    "                break\n",
    "\n",
    "            next_page = response.get(\"next_page\")\n",
    "            if not next_page:\n",
    "                break\n",
    "\n",
    "        # Write CSV\n",
    "        if rows:\n",
    "            with open(output_file, \"w\", newline=\"\") as csvfile:\n",
    "                writer = csv.DictWriter(csvfile, fieldnames=rows[0].keys())\n",
    "                writer.writeheader()\n",
    "                writer.writerows(rows)\n",
    "\n",
    "            print(f\"✅ Exported {len(rows)} rows to {output_file}\")\n",
    "        else:\n",
    "            print(f\"No usage data to export for the last {days_back} days\")\n",
    "            print(\"💡 Try increasing days_back or check if you have recent API usage\")\n",
    "\n",
    "    except Exception as e:\n",
    "        print(f\"❌ Export failed: {e}\")\n",
    "\n",
    "\n",
    "def export_costs_to_csv(client, output_file=\"cost_data.csv\", days_back=30):\n",
    "    \"\"\"Export cost data to CSV.\"\"\"\n",
    "\n",
    "    end_time = datetime.combine(datetime.utcnow(), time.min)\n",
    "    start_time = end_time - timedelta(days=days_back)\n",
    "\n",
    "    params = {\n",
    "        \"starting_at\": start_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"ending_at\": end_time.strftime(\"%Y-%m-%dT%H:%M:%SZ\"),\n",
    "        \"group_by[]\": [\"workspace_id\", \"description\"],\n",
    "    }\n",
    "\n",
    "    try:\n",
    "        # Collect all data across pages\n",
    "        rows = []\n",
    "        page_count = 0\n",
    "        max_pages = 20\n",
    "        next_page = None\n",
    "\n",
    "        while page_count < max_pages:\n",
    "            current_params = params.copy()\n",
    "            if next_page:\n",
    "                current_params[\"page\"] = next_page\n",
    "\n",
    "            response = client._make_request(\"cost_report\", current_params)\n",
    "            page_count += 1\n",
    "\n",
    "            # Process this page's data\n",
    "            for bucket in response.get(\"data\", []):\n",
    "                date = bucket[\"starting_at\"][:10]\n",
    "                for result in bucket[\"results\"]:\n",
    "                    # Handle both string and numeric amounts\n",
    "                    amount = result.get(\"amount\", 0)\n",
    "                    if isinstance(amount, str):\n",
    "                        try:\n",
    "                            amount = float(amount)\n",
    "                        except (ValueError, TypeError):\n",
    "                            amount = 0\n",
    "\n",
    "                    rows.append(\n",
    "                        {\n",
    "                            \"date\": date,\n",
    "                            \"workspace_id\": result.get(\n",
    "                                \"workspace_id\", \"\"\n",
    "                            ),  # null for default workspace\n",
    "                            \"description\": result.get(\"description\", \"\"),\n",
    "                            \"currency\": result.get(\"currency\", \"USD\"),\n",
    "                            \"amount_usd\": amount / 100,\n",
    "                        }\n",
    "                    )\n",
    "\n",
    "            # Check if there's more data\n",
    "            if not response.get(\"has_more\", False):\n",
    "                break\n",
    "\n",
    "            next_page = response.get(\"next_page\")\n",
    "            if not next_page:\n",
    "                break\n",
    "\n",
    "        if rows:\n",
    "            with open(output_file, \"w\", newline=\"\") as csvfile:\n",
    "                writer = csv.DictWriter(csvfile, fieldnames=rows[0].keys())\n",
    "                writer.writeheader()\n",
    "                writer.writerows(rows)\n",
    "\n",
    "            print(f\"✅ Exported {len(rows)} cost records to {output_file}\")\n",
    "        else:\n",
    "            print(f\"No cost data to export for the last {days_back} days\")\n",
    "            print(\"💡 Try increasing days_back or check if you have recent API usage\")\n",
    "\n",
    "    except Exception as e:\n",
    "        print(f\"❌ Cost export failed: {e}\")\n",
    "\n",
    "\n",
    "# Example usage\n",
    "if client:\n",
    "    export_usage_to_csv(client, \"my_usage_data.csv\", days_back=14)\n",
    "    export_costs_to_csv(client, \"my_cost_data.csv\", days_back=14)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5327a1c5",
   "metadata": {
    "id": "5327a1c5"
   },
   "source": [
    "## Wrapping Up\n",
    "\n",
    "This cookbook covers the essential patterns for working with the Usage & Cost Admin API:\n",
    "\n",
    "- **Basic queries** for usage and cost data\n",
    "- **Grouping and filtering** for detailed analysis  \n",
    "- **Pagination** for large datasets\n",
    "- **Cost description parsing** for categorization\n",
    "- **Common gotchas** to avoid issues\n",
    "- **Simple CSV export** for external tools\n",
    "\n",
    "### Next Steps\n",
    "\n",
    "- Check the [official API documentation](https://docs.claude.com) for the latest field definitions\n",
    "- Test your integration with small date ranges first\n",
    "- Consider data retention needs for your use case\n",
    "- Monitor for new API features that may enhance your analysis\n",
    "\n",
    "### Important Notes\n",
    "\n",
    "- Field names and available options may evolve as the API matures\n",
    "- Always handle unknown values gracefully in production code\n",
    "- The API is designed for historical analysis, not real-time monitoring\n",
    "- Priority Tier costs use a different billing model and don't appear in cost endpoints\n",
    "\n",
    "Happy analyzing! 📊"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "provenance": []
  },
  "kernelspec": {
   "display_name": "base",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}